package com.happy3w.task.composer;

import com.happy3w.toolkits.message.MessageRecorderException;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class TaskExecutor {
    private final List<TaskStatusHolder> tasksRemainToRun;
    private final TaskExecuteContext context;
    private boolean canceled = false;

    @Getter
    @Setter
    private int threadCount;

    @Getter
    @Setter
    private int waitTime = 200;

    public TaskExecutor(Collection<TaskStatusHolder> tasksToRun, TaskExecuteContext context) {
        this.tasksRemainToRun = new ArrayList<>(tasksToRun);
        this.context = context;
        threadCount = getThreadCount();
    }

    public List<DependItem> dependItemList() {
        return TaskStatusHolder.genDependGraphItems(tasksRemainToRun);
    }

    public TaskExecutor withThreadCount(int threadCount) {
        this.threadCount = threadCount;
        return this;
    }

    public TaskExecutor withWaitTime(int waitTime) {
        this.waitTime = waitTime;
        return this;
    }

    public <T> T getDataValue(String name) {
        if (threadCount <= 1 || tasksRemainToRun == null || tasksRemainToRun.size() <= 1) {
            new TaskExecuteWorker().run();
        } else {
            runAllTaskWithMultiThread();
        }

        return (T) context.getValue(name);
    }

    private void runAllTaskWithMultiThread() {
        ExecutorService executor = null;
        try {
            int curThreadCount = threadCount < tasksRemainToRun.size() ? threadCount : tasksRemainToRun.size();
            executor = Executors.newFixedThreadPool(curThreadCount);
            for (int i = 0; i < curThreadCount; i++) {
                executor.submit(new TaskExecuteWorker());
            }
            waitAllFinish(executor);

            ensureNoError();
        } finally {
            if (executor != null) {
                executor.shutdown();
            }
        }
    }

    private void ensureNoError() {
        for (TaskStatusHolder holder : tasksRemainToRun) {
            if (holder.getStatus() == TaskStatus.failed) {
                throw new MessageRecorderException("Failed to run task:" + holder.getTask().getOutputs()
                        + ", with message:" + holder.getMessage(), holder.getThrowable());
            }
        }
    }

    private void waitAllFinish(ExecutorService executor) {
        executor.shutdown();

        try {
            while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                StringBuilder buf = new StringBuilder("Remain Task status:\n");
                synchronized (tasksRemainToRun) {
                    for (TaskStatusHolder holder : tasksRemainToRun) {
                        buf.append(holder.getStatus())
                                .append('\t')
                                .append(holder.getTask().outputNames())
                                .append('\n');
                    }
                }
                log.info(buf.toString());
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private int getThreadCount() {
        int processors = Runtime.getRuntime().availableProcessors();
        int threadCount = processors - 2;
        if (threadCount < 2) {
            threadCount = 2;
        }
        return threadCount;
    }

    private class TaskExecuteWorker implements Runnable {

        @Override
        public void run() {
            while (!tasksRemainToRun.isEmpty() && !canceled) {
                TaskStatusHolder taskHolder = findNoDependTask();
                if (taskHolder == null) {
                    try {
                        Thread.sleep(waitTime);
                    } catch (InterruptedException e) {
                    }
                    continue;
                }
                try {
                    taskHolder.setStatus(TaskStatus.running);
                    taskHolder.getTask().execute(context);
                    taskHolder.setStatus(TaskStatus.finished);
                } catch (Exception e) {
                    taskHolder.configError("Unknown error", e);
                    canceled = true;
                    throw e;
                }
            }
        }

        private TaskStatusHolder findNoDependTask() {
            synchronized (tasksRemainToRun) {
                TaskStatusHolder taskToRun = null;
                Iterator<TaskStatusHolder> it = tasksRemainToRun.iterator();
                while (it.hasNext()) {
                    TaskStatusHolder holder = it.next();
                    if (holder.getStatus() == TaskStatus.finished) {
                        it.remove();
                        continue;
                    }
                    if (holder.getStatus() == TaskStatus.waiting
                            && allIsFinished(holder.getDepends())) {
                        taskToRun = holder;
                        taskToRun.setStatus(TaskStatus.marked);
                        break;
                    }
                }
                return taskToRun;
            }
        }

        private boolean allIsFinished(List<TaskStatusHolder> holders) {
            if (holders == null || holders.isEmpty()) {
                return true;
            }
            for (TaskStatusHolder dependHolder : holders) {
                if (dependHolder.getStatus() != TaskStatus.finished) {
                    return false;
                }
            }
            return true;
        }
    }

}
